package com.tpvlog.dfs.datanode.client;

import com.alibaba.fastjson.JSONArray;
import com.tpvlog.dfs.datanode.file.DataNodeInfo;
import com.tpvlog.dfs.rpc.service.*;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;

import static com.tpvlog.dfs.datanode.config.DataNodeConfig.*;

/**
 * 与NameNode进行RPC通信的组件
 *
 * @author Ressmix
 */
public class NameNodeRpcClient {

    private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;

    public NameNodeRpcClient() {
        ManagedChannel channel = NettyChannelBuilder
                .forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
                .negotiationType(NegotiationType.PLAINTEXT)
                .build();
        this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
    }

    /**
     * 向NameNode进行一次注册
     */
    public RegisterResponse register() {
        RegisterRequest request = RegisterRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setNioPort(NIO_PORT)
                .build();
        RegisterResponse response = namenode.register(request);
        System.out.println("完成向NameNode的注册，响应消息为：" + response.getStatus());

        return response;
    }

    /**
     * 向NameNode发送一次心跳
     */
    public HeartbeatResponse heartbeat() {
        HeartbeatRequest request = HeartbeatRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setNioPort(NIO_PORT)
                .build();
        return namenode.heartbeat(request);
    }

    /**
     * 向NameNode全量上报元数据信息
     */
    public void fullyReportDataNodeInfo(DataNodeInfo dataNodeInfo) {
        if (dataNodeInfo == null) {
            System.out.println("当前没有存储任何文件，不需要全量上报.....");
            return;
        }
        FullyReportRequest request = FullyReportRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setFilenameList(JSONArray.toJSONString(dataNodeInfo.getFilenames()))
                .setStoredDataSize(dataNodeInfo.getStoredDataSize())
                .build();
        namenode.fullyReportDataNodeInfo(request);
        System.out.println("全量上报DataNode信息：" + dataNodeInfo);
    }

    /**
     * 向NameNode增量上报元数据信息
     */
    public void deltaReportDataNodeInfo(String filename, long filesize) {
        DeltaReportRequest request = DeltaReportRequest.newBuilder()
                .setHostname(DATANODE_HOSTNAME)
                .setIp(DATANODE_IP)
                .setFilename(filename)
                .setFilesize(filesize)
                .build();
        namenode.deltaReportDataNodeInfo(request);
        System.out.println("增量上报DataNode信息：" + filename + ", " + filesize);
    }
}
